---
title: Workflow System Overview | Kastrax Docs
description: Overview of the workflow system in Kastrax, detailing how to create, configure, and execute workflows.
---

# Workflow System Overview ✅

The Kastrax workflow system allows you to define, execute, and monitor complex sequences of operations involving agents and other components. This guide explains the workflow system architecture and how to use it effectively.

## What Are Workflows? ✅

Workflows in Kastrax are structured sequences of steps that can be executed in a specific order, with data flowing between steps. They enable you to:

- Orchestrate multiple agents to collaborate on complex tasks
- Define conditional execution paths based on intermediate results
- Process and transform data between steps
- Handle errors and retries
- Monitor and track execution progress
- Create reusable patterns for common agent interactions

## Workflow System Architecture ✅

The Kastrax workflow system consists of several key components:

1. **Workflow**: The top-level container that defines a sequence of steps
2. **WorkflowStep**: Individual units of work within a workflow
3. **WorkflowContext**: Shared state and data passed between steps
4. **WorkflowEngine**: The runtime that executes workflows
5. **WorkflowBuilder**: DSL for creating workflows
6. **WorkflowStateStorage**: Storage for workflow execution state

## Creating Basic Workflows ✅

Kastrax provides a DSL for creating workflows:

```kotlin
import ai.kastrax.core.workflow.builder.workflow
import ai.kastrax.core.agent.agent
import ai.kastrax.integrations.deepseek.deepSeek
import ai.kastrax.integrations.deepseek.DeepSeekModel
import kotlinx.coroutines.runBlocking

fun main() = runBlocking {
    // Create agents for the workflow
    val researchAgent = agent {
        name("ResearchAgent")
        description("An agent that researches topics")
        
        model = deepSeek {
            apiKey("your-deepseek-api-key")
            model(DeepSeekModel.DEEPSEEK_CHAT)
            temperature(0.7)
        }
    }
    
    val writingAgent = agent {
        name("WritingAgent")
        description("An agent that writes content based on research")
        
        model = deepSeek {
            apiKey("your-deepseek-api-key")
            model(DeepSeekModel.DEEPSEEK_CHAT)
            temperature(0.7)
        }
    }
    
    // Create a workflow
    val researchWorkflow = workflow("research-workflow", "Research and write an article") {
        // Research step
        step(researchAgent) {
            id = "research"
            name = "Research"
            description = "Research the topic"
            variables = mutableMapOf(
                "topic" to variable("$.input.topic")
            )
        }
        
        // Writing step
        step(writingAgent) {
            id = "writing"
            name = "Writing"
            description = "Write an article based on research"
            after("research")
            variables = mutableMapOf(
                "research" to variable("$.steps.research.output.text")
            )
        }
        
        // Define output mapping
        output {
            "researchResults" from "$.steps.research.output.text"
            "article" from "$.steps.writing.output.text"
            "wordCount" from {
                "$.steps.writing.output.text" to { text ->
                    (text as? String)?.split(" ")?.size ?: 0
                }
            }
        }
    }
    
    // Execute the workflow
    val workflowEngine = WorkflowEngine()
    workflowEngine.registerWorkflow("research-workflow", researchWorkflow)
    
    val result = workflowEngine.executeWorkflow(
        workflowId = "research-workflow",
        input = mapOf("topic" to "Artificial Intelligence")
    )
    
    println("Workflow result: ${result.output}")
}
```

## Workflow Components ✅

### Workflow ✅

The `Workflow` interface defines the core functionality of a workflow:

```kotlin
interface Workflow {
    suspend fun execute(
        input: Map<String, Any?>,
        options: WorkflowExecuteOptions = WorkflowExecuteOptions()
    ): WorkflowResult

    suspend fun streamExecute(
        input: Map<String, Any?>,
        options: WorkflowExecuteOptions = WorkflowExecuteOptions()
    ): Flow<WorkflowStatusUpdate>
}
```

### WorkflowStep ✅

The `WorkflowStep` interface defines individual steps within a workflow:

```kotlin
interface WorkflowStep {
    val id: String
    val name: String
    val description: String
    val after: List<String>
    val variables: Map<String, VariableReference>
    val config: StepConfig?
        get() = null
    val condition: (WorkflowContext) -> Boolean
        get() = { true }
    
    suspend fun execute(context: WorkflowContext): WorkflowStepResult
}
```

### WorkflowContext ✅

The `WorkflowContext` class holds the state and data during workflow execution:

```kotlin
data class WorkflowContext(
    val input: Map<String, Any?>,
    val steps: MutableMap<String, WorkflowStepResult> = mutableMapOf(),
    val variables: MutableMap<String, Any?> = mutableMapOf(),
    val runId: String? = null
) {
    fun resolveReference(reference: VariableReference): Any? {
        // Implementation...
    }
    
    fun resolveVariables(variables: Map<String, VariableReference>): Map<String, Any?> {
        // Implementation...
    }
}
```

### WorkflowEngine ✅

The `WorkflowEngine` class is responsible for executing workflows:

```kotlin
class WorkflowEngine(
    private val stateStorage: WorkflowStateStorage = InMemoryWorkflowStateStorage()
) {
    private val workflows = mutableMapOf<String, Workflow>()
    
    fun registerWorkflow(workflowId: String, workflow: Workflow) {
        workflows[workflowId] = workflow
    }
    
    suspend fun executeWorkflow(
        workflowId: String,
        input: Map<String, Any?>,
        options: WorkflowExecuteOptions = WorkflowExecuteOptions()
    ): WorkflowResult {
        // Implementation...
    }
    
    suspend fun streamExecuteWorkflow(
        workflowId: String,
        input: Map<String, Any?>,
        options: WorkflowExecuteOptions = WorkflowExecuteOptions()
    ): Flow<WorkflowStatusUpdate> {
        // Implementation...
    }
}
```

## Step Types ✅

Kastrax provides several built-in step types:

### Agent Step ✅

The most common step type, which executes an agent with specific inputs:

```kotlin
step(agent) {
    id = "generate-content"
    name = "Generate Content"
    description = "Generate content based on a topic"
    variables = mutableMapOf(
        "topic" to variable("$.input.topic"),
        "style" to variable("$.input.style")
    )
}
```

### Function Step ✅

Executes a custom function:

```kotlin
functionStep {
    id = "process-data"
    name = "Process Data"
    description = "Process data from previous steps"
    after("collect-data")
    function { context ->
        val data = context.resolveReference(variable("$.steps.collect-data.output.data"))
        // Process the data
        mapOf("processedData" to processData(data))
    }
}
```

### Conditional Step ✅

Executes based on a condition:

```kotlin
step(agent) {
    id = "optional-step"
    name = "Optional Step"
    description = "This step only executes under certain conditions"
    after("previous-step")
    condition { context ->
        val quality = context.resolveReference(variable("$.steps.previous-step.output.quality"))
        (quality as? Int) ?: 0 > 7
    }
    variables = mutableMapOf(
        "input" to variable("$.steps.previous-step.output.text")
    )
}
```

### Subworkflow Step ✅

Executes another workflow as a step:

```kotlin
subworkflowStep {
    id = "nested-workflow"
    name = "Nested Workflow"
    description = "Execute a nested workflow"
    workflowId = "another-workflow"
    inputMapping { context ->
        mapOf(
            "data" to context.resolveReference(variable("$.steps.previous-step.output.data"))
        )
    }
}
```

## Data Flow ✅

Data flows through a workflow via variable references:

### Variable References ✅

Variable references use a JSON path-like syntax to access data:

```kotlin
// Reference workflow input
val topicRef = variable("$.input.topic")

// Reference output from a previous step
val researchRef = variable("$.steps.research.output.text")

// Reference a global variable
val configRef = variable("$.variables.config")
```

### Output Mapping ✅

Output mapping defines how to extract and transform the final workflow output:

```kotlin
output {
    // Direct mapping
    "researchResults" from "$.steps.research.output.text"
    
    // Mapping with transformation
    "wordCount" from {
        "$.steps.writing.output.text" to { text ->
            (text as? String)?.split(" ")?.size ?: 0
        }
    }
    
    // Combining multiple sources
    "summary" from {
        "$.steps.research.output.text" to { research ->
            "$.steps.writing.output.text" to { article ->
                "Research: $research\n\nArticle: $article"
            }
        }
    }
}
```

## Error Handling ✅

Workflows can handle errors in several ways:

### Retry Configuration ✅

```kotlin
step(agent) {
    id = "unreliable-step"
    name = "Unreliable Step"
    description = "A step that might fail"
    
    // Configure retries
    config = StepConfig(
        retryConfig = RetryConfig(
            maxRetries = 3,
            retryDelay = 1000, // 1 second
            backoffMultiplier = 2.0 // Exponential backoff
        )
    )
}
```

### Error Handlers ✅

```kotlin
// Create an error handling workflow engine
val workflowEngine = ErrorHandlingWorkflowEngine(
    errorHandler = object : ErrorHandler {
        override suspend fun handleStepError(
            workflowId: String,
            runId: String,
            stepId: String,
            error: Throwable
        ): ErrorHandlingAction {
            return when {
                error is TimeoutException -> ErrorHandlingAction.Retry(maxRetries = 3)
                stepId == "optional-step" -> ErrorHandlingAction.Skip
                else -> ErrorHandlingAction.Fail
            }
        }
    }
)
```

## Workflow Execution ✅

Workflows can be executed in different ways:

### Synchronous Execution ✅

```kotlin
val result = workflowEngine.executeWorkflow(
    workflowId = "my-workflow",
    input = mapOf("key" to "value")
)

println("Workflow completed with output: ${result.output}")
```

### Streaming Execution ✅

```kotlin
workflowEngine.streamExecuteWorkflow(
    workflowId = "my-workflow",
    input = mapOf("key" to "value")
).collect { update ->
    when (update) {
        is WorkflowStatusUpdate.Started -> println("Workflow started")
        is WorkflowStatusUpdate.StepStarted -> println("Step started: ${update.stepId}")
        is WorkflowStatusUpdate.StepCompleted -> println("Step completed: ${update.stepId}")
        is WorkflowStatusUpdate.StepFailed -> println("Step failed: ${update.stepId}, error: ${update.error}")
        is WorkflowStatusUpdate.Completed -> println("Workflow completed")
        is WorkflowStatusUpdate.Failed -> println("Workflow failed: ${update.error}")
    }
}
```

### Execution Options ✅

```kotlin
val options = WorkflowExecuteOptions(
    maxSteps = 20, // Maximum number of steps to execute
    timeout = 120000, // 2 minutes timeout
    onStepFinish = { stepResult ->
        println("Step ${stepResult.stepId} finished with output: ${stepResult.output}")
    },
    onStepError = { stepId, error ->
        println("Step $stepId failed with error: ${error.message}")
    },
    threadId = "conversation-123" // Optional thread ID for memory integration
)

val result = workflowEngine.executeWorkflow(
    workflowId = "my-workflow",
    input = mapOf("key" to "value"),
    options = options
)
```

## Workflow State Management ✅

Workflows maintain state during and after execution:

### State Storage ✅

```kotlin
// In-memory state storage (default)
val inMemoryStorage = InMemoryWorkflowStateStorage()

// File-based state storage
val fileStorage = FileWorkflowStateStorage("workflows/state")

// Database state storage
val dbStorage = DatabaseWorkflowStateStorage(dataSource)

// Create a workflow engine with custom state storage
val workflowEngine = WorkflowEngine(stateStorage = fileStorage)
```

### State Queries ✅

```kotlin
// Get workflow state
val state = workflowEngine.getWorkflowState("my-workflow", "run-123")

// Get all runs for a workflow
val runs = workflowEngine.getWorkflowRuns("my-workflow")

// Get active runs
val activeRuns = workflowEngine.getActiveWorkflowRuns()
```

## Advanced Workflow Features ✅

### Parallel Execution ✅

```kotlin
val parallelWorkflow = workflow("parallel-workflow", "Execute steps in parallel") {
    // These steps have no dependencies, so they can run in parallel
    step(agent1) {
        id = "step1"
        name = "Step 1"
        description = "First parallel step"
    }
    
    step(agent2) {
        id = "step2"
        name = "Step 2"
        description = "Second parallel step"
    }
    
    // This step depends on both parallel steps
    step(agent3) {
        id = "step3"
        name = "Step 3"
        description = "Final step after parallel execution"
        after("step1", "step2")
        variables = mutableMapOf(
            "result1" to variable("$.steps.step1.output.result"),
            "result2" to variable("$.steps.step2.output.result")
        )
    }
}
```

### Workflow Composition ✅

```kotlin
// Create a workflow composer
val composer = WorkflowComposer("my-composer", workflowEngine)

// Compose workflows sequentially
val sequentialWorkflow = composer.composeSequential(
    workflowName = "sequential-workflow",
    description = "Execute workflows in sequence",
    workflows = listOf(
        "workflow1" to { input -> input }, // Input mapping for workflow1
        "workflow2" to { input, prevOutput -> prevOutput } // Use previous output as input
    )
)

// Compose workflows in parallel
val parallelWorkflow = composer.composeParallel(
    workflowName = "parallel-workflow",
    description = "Execute workflows in parallel",
    workflows = mapOf(
        "branch1" to "workflow1",
        "branch2" to "workflow2"
    ),
    inputMapping = { branch, input ->
        // Map input for each branch
        when (branch) {
            "branch1" -> mapOf("key1" to input["value"])
            "branch2" -> mapOf("key2" to input["value"])
            else -> emptyMap()
        }
    },
    mergeStep = MergeResultsStep(
        id = "merge",
        name = "Merge Results",
        description = "Merge results from parallel workflows"
    )
)
```

### State Machines ✅

For complex workflows with multiple possible paths, you can use state machines:

```kotlin
// Define states
enum class OrderState {
    NEW, PROCESSING, SHIPPED, DELIVERED, CANCELED
}

// Define events
sealed class OrderEvent {
    data class PlaceOrder(val items: List<String>) : OrderEvent()
    data class ProcessOrder(val paymentId: String) : OrderEvent()
    data class ShipOrder(val trackingNumber: String) : OrderEvent()
    data class DeliverOrder(val deliveryDate: String) : OrderEvent()
    object CancelOrder : OrderEvent()
}

// Create a state machine
val orderStateMachine = BasicStateMachine<OrderState, OrderEvent, Map<String, Any>>(
    initialState = OrderState.NEW,
    initialContext = emptyMap(),
    transitioner = object : StateTransitioner<OrderState, OrderEvent, Map<String, Any>> {
        override suspend fun transition(
            currentState: OrderState,
            event: OrderEvent,
            context: Map<String, Any>
        ): TransitionResult<OrderState, Map<String, Any>> {
            // Define state transitions
            val (nextState, updatedContext) = when (currentState to event) {
                OrderState.NEW to is OrderEvent.PlaceOrder -> {
                    OrderState.PROCESSING to context + ("items" to event.items)
                }
                OrderState.PROCESSING to is OrderEvent.ProcessOrder -> {
                    OrderState.SHIPPED to context + ("paymentId" to event.paymentId)
                }
                OrderState.SHIPPED to is OrderEvent.ShipOrder -> {
                    OrderState.DELIVERED to context + ("trackingNumber" to event.trackingNumber)
                }
                OrderState.SHIPPED to is OrderEvent.DeliverOrder -> {
                    OrderState.DELIVERED to context + ("deliveryDate" to event.deliveryDate)
                }
                else to is OrderEvent.CancelOrder -> {
                    OrderState.CANCELED to context
                }
                else -> currentState to context
            }
            
            return TransitionResult(
                nextState = nextState,
                updatedContext = updatedContext,
                sideEffects = emptyList()
            )
        }
    }
)

// Use the state machine
val nextState = orderStateMachine.sendEvent(OrderEvent.PlaceOrder(listOf("item1", "item2")))
println("Next state: $nextState")
```

## Complete Example ✅

Here's a complete example of a sophisticated workflow:

```kotlin
import ai.kastrax.core.workflow.builder.workflow
import ai.kastrax.core.workflow.engine.WorkflowEngine
import ai.kastrax.core.agent.agent
import ai.kastrax.integrations.deepseek.deepSeek
import ai.kastrax.integrations.deepseek.DeepSeekModel
import kotlinx.coroutines.runBlocking

fun main() = runBlocking {
    // Create agents
    val researchAgent = agent {
        name("ResearchAgent")
        description("An agent that researches topics")
        
        model = deepSeek {
            apiKey("your-deepseek-api-key")
            model(DeepSeekModel.DEEPSEEK_CHAT)
            temperature(0.7)
        }
    }
    
    val writingAgent = agent {
        name("WritingAgent")
        description("An agent that writes content based on research")
        
        model = deepSeek {
            apiKey("your-deepseek-api-key")
            model(DeepSeekModel.DEEPSEEK_CHAT)
            temperature(0.7)
        }
    }
    
    val editingAgent = agent {
        name("EditingAgent")
        description("An agent that edits and improves content")
        
        model = deepSeek {
            apiKey("your-deepseek-api-key")
            model(DeepSeekModel.DEEPSEEK_CHAT)
            temperature(0.7)
        }
    }
    
    val factCheckingAgent = agent {
        name("FactCheckingAgent")
        description("An agent that verifies facts in content")
        
        model = deepSeek {
            apiKey("your-deepseek-api-key")
            model(DeepSeekModel.DEEPSEEK_CHAT)
            temperature(0.7)
        }
    }
    
    // Create a content creation workflow
    val contentWorkflow = workflow("content-creation", "Create researched and edited content") {
        // Research step
        step(researchAgent) {
            id = "research"
            name = "Research"
            description = "Research the topic thoroughly"
            variables = mutableMapOf(
                "topic" to variable("$.input.topic"),
                "depth" to variable("$.input.researchDepth")
            )
        }
        
        // Writing step
        step(writingAgent) {
            id = "writing"
            name = "Writing"
            description = "Write content based on research"
            after("research")
            variables = mutableMapOf(
                "research" to variable("$.steps.research.output.text"),
                "style" to variable("$.input.contentStyle"),
                "length" to variable("$.input.contentLength")
            )
        }
        
        // Parallel fact checking and editing
        step(factCheckingAgent) {
            id = "fact-checking"
            name = "Fact Checking"
            description = "Verify facts in the content"
            after("writing")
            variables = mutableMapOf(
                "content" to variable("$.steps.writing.output.text"),
                "research" to variable("$.steps.research.output.text")
            )
        }
        
        step(editingAgent) {
            id = "editing"
            name = "Editing"
            description = "Edit and improve the content"
            after("writing")
            variables = mutableMapOf(
                "content" to variable("$.steps.writing.output.text"),
                "style" to variable("$.input.contentStyle")
            )
        }
        
        // Final revision step
        step(editingAgent) {
            id = "final-revision"
            name = "Final Revision"
            description = "Create the final version incorporating edits and fact checks"
            after("fact-checking", "editing")
            variables = mutableMapOf(
                "original" to variable("$.steps.writing.output.text"),
                "edits" to variable("$.steps.editing.output.text"),
                "factChecks" to variable("$.steps.fact-checking.output.text")
            )
        }
        
        // Conditional quality check step
        step(editingAgent) {
            id = "quality-check"
            name = "Quality Check"
            description = "Perform a quality check if requested"
            after("final-revision")
            condition { context ->
                val performQualityCheck = context.resolveReference(variable("$.input.performQualityCheck"))
                (performQualityCheck as? Boolean) ?: false
            }
            variables = mutableMapOf(
                "content" to variable("$.steps.final-revision.output.text")
            )
        }
        
        // Define output mapping
        output {
            "research" from "$.steps.research.output.text"
            "draft" from "$.steps.writing.output.text"
            "factChecks" from "$.steps.fact-checking.output.text"
            "edits" from "$.steps.editing.output.text"
            "finalContent" from "$.steps.final-revision.output.text"
            "qualityScore" from {
                "$.steps.quality-check.output.score" to { score ->
                    score ?: "No quality check performed"
                }
            }
            "metadata" from {
                "$.steps.writing.output.wordCount" to { wordCount ->
                    "$.steps.final-revision.output.wordCount" to { finalWordCount ->
                        mapOf(
                            "initialWordCount" to wordCount,
                            "finalWordCount" to finalWordCount,
                            "changePercentage" to calculatePercentage(wordCount, finalWordCount)
                        )
                    }
                }
            }
        }
    }
    
    // Create workflow engine
    val workflowEngine = WorkflowEngine()
    workflowEngine.registerWorkflow("content-creation", contentWorkflow)
    
    // Execute the workflow
    val result = workflowEngine.executeWorkflow(
        workflowId = "content-creation",
        input = mapOf(
            "topic" to "Artificial Intelligence Ethics",
            "researchDepth" to "comprehensive",
            "contentStyle" to "academic",
            "contentLength" to "2000 words",
            "performQualityCheck" to true
        )
    )
    
    // Print the results
    println("Workflow completed with status: ${if (result.success) "SUCCESS" else "FAILURE"}")
    println("Final content: ${result.output["finalContent"]}")
    println("Quality score: ${result.output["qualityScore"]}")
    println("Metadata: ${result.output["metadata"]}")
}

// Helper function to calculate percentage change
fun calculatePercentage(initial: Any?, final: Any?): Double {
    val initialValue = (initial as? Number)?.toDouble() ?: return 0.0
    val finalValue = (final as? Number)?.toDouble() ?: return 0.0
    
    if (initialValue == 0.0) return 0.0
    return ((finalValue - initialValue) / initialValue) * 100.0
}
```

## Best Practices ✅

1. **Step Granularity**: Design steps with appropriate granularity - not too small to create overhead, not too large to lose flexibility
2. **Error Handling**: Implement robust error handling with retries for unreliable operations
3. **Data Flow**: Be explicit about data flow between steps using variable references
4. **Conditional Logic**: Use conditions to create dynamic workflows that adapt to intermediate results
5. **Monitoring**: Set up proper monitoring and logging for workflow execution
6. **State Management**: Choose appropriate state storage based on your reliability requirements
7. **Testing**: Test workflows with different inputs and edge cases
8. **Composition**: Compose complex workflows from simpler, reusable workflows

## Next Steps ✅

Now that you understand the workflow system, you can:

1. Learn about [workflow definition](./workflow-definition.mdx)
2. Explore [workflow execution](./workflow-execution.mdx)
3. Implement [agent integration with workflows](./agent-integration.mdx)
4. Set up [workflow state management](./state-management.mdx)
